home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.4) """Transaction objects manage resources for an individual activity. Compatibility issues -------------------- The implementation of Transaction objects involves two layers of backwards compatibility, because this version of transaction supports both ZODB 3 and ZODB 4. Zope is evolving towards the ZODB4 interfaces. Transaction has two methods for a resource manager to call to participate in a transaction -- register() and join(). join() takes a resource manager and adds it to the list of resources. register() is for backwards compatibility. It takes a persistent object and registers its _p_jar attribute. TODO: explain adapter Subtransactions --------------- Note: Subtransactions are deprecated! Use savepoint/rollback instead. A subtransaction applies the transaction notion recursively. It allows a set of modifications within a transaction to be committed or aborted as a group. A subtransaction is a strictly local activity; its changes are not visible to any other database connection until the top-level transaction commits. In addition to its use to organize a large transaction, subtransactions can be used to optimize memory use. ZODB must keep modified objects in memory until a transaction commits and it can write the changes to the storage. A subtransaction uses a temporary disk storage for its commits, allowing modified objects to be flushed from memory when the subtransaction commits. The commit() and abort() methods take an optional subtransaction argument that defaults to false. If it is a true, the operation is performed on a subtransaction. Subtransactions add a lot of complexity to the transaction implementation. Some resource managers support subtransactions, but they are not required to. (ZODB Connection is the only standard resource manager that supports subtransactions.) Resource managers that do support subtransactions implement abort_sub() and commit_sub() methods and support a second argument to tpc_begin(). The second argument to tpc_begin() indicates that a subtransaction commit is beginning (if it is true). In a subtransaction, there is no tpc_vote() call, because sub-transactions don't need 2-phase commit. If a sub-transaction abort or commit fails, we can abort the outer transaction. The tpc_finish() or tpc_abort() call applies just to that subtransaction. Once a resource manager is involved in a subtransaction, all subsequent transactions will be treated as subtransactions until abort_sub() or commit_sub() is called. abort_sub() will undo all the changes of the subtransactions. commit_sub() will begin a top-level transaction and store all the changes from subtransactions. After commit_sub(), the transaction must still call tpc_vote() and tpc_finish(). If the resource manager does not support subtransactions, nothing happens when the subtransaction commits. Instead, the resource manager is put on a list of managers to commit when the actual top-level transaction commits. If this happens, it will not be possible to abort subtransactions. Two-phase commit ---------------- A transaction commit involves an interaction between the transaction object and one or more resource managers. The transaction manager calls the following four methods on each resource manager; it calls tpc_begin() on each resource manager before calling commit() on any of them. 1. tpc_begin(txn) 2. commit(txn) 3. tpc_vote(txn) 4. tpc_finish(txn) Subtransaction commit --------------------- Note: Subtransactions are deprecated! When a subtransaction commits, the protocol is different. 1. tpc_begin() is passed a second argument, which indicates that a subtransaction is being committed. 2. tpc_vote() is not called. Once a subtransaction has been committed, the top-level transaction commit will start with a commit_sub() call instead of a tpc_begin() call. Before-commit hook ------------------ Sometimes, applications want to execute some code when a transaction is committed. For example, one might want to delay object indexing until a transaction commits, rather than indexing every time an object is changed. Or someone might want to check invariants only after a set of operations. A pre-commit hook is available for such use cases: use addBeforeCommitHook(), passing it a callable and arguments. The callable will be called with its arguments at the start of the commit (but not for substransaction commits). Error handling -------------- When errors occur during two-phase commit, the transaction manager aborts all the resource managers. The specific methods it calls depend on whether the error occurs before or after the call to tpc_vote() on that transaction manager. If the resource manager has not voted, then the resource manager will have one or more uncommitted objects. There are two cases that lead to this state; either the transaction manager has not called commit() for any objects on this resource manager or the call that failed was a commit() for one of the objects of this resource manager. For each uncommitted object, including the object that failed in its commit(), call abort(). Once uncommitted objects are aborted, tpc_abort() or abort_sub() is called on each resource manager. Synchronization --------------- You can register sychronization objects (synchronizers) with the tranasction manager. The synchronizer must implement beforeCompletion() and afterCompletion() methods. The transaction manager calls beforeCompletion() when it starts a top-level two-phase commit. It calls afterCompletion() when a top-level transaction is committed or aborted. The methods are passed the current Transaction as their only argument. """ import logging import sys import thread import warnings import weakref import traceback from cStringIO import StringIO from zope import interface from transaction import interfaces _marker = object() def myhasattr(obj, attr): return getattr(obj, attr, _marker) is not _marker class Status: ACTIVE = 'Active' COMMITTING = 'Committing' COMMITTED = 'Committed' COMMITFAILED = 'Commit failed' class Transaction(object): interface.implements(interfaces.ITransaction, interfaces.ITransactionDeprecated) _savepoint_index = 0 _savepoint2index = None _subtransaction_savepoint = None user = '' description = '' def __init__(self, synchronizers = None, manager = None): self.status = Status.ACTIVE self._resources = [] if synchronizers is None: WeakSet = WeakSet import ZODB.utils synchronizers = WeakSet() self._synchronizers = synchronizers self._manager = manager self._adapters = { } self._voted = { } self._extension = { } self.log = logging.getLogger('txn.%d' % thread.get_ident()) self.log.debug('new transaction') self._failure_traceback = None self._before_commit = [] def _prior_operation_failed(self): TransactionFailedError = TransactionFailedError import ZODB.POSException if not self._failure_traceback is not None: raise AssertionError raise TransactionFailedError('An operation previously failed, with traceback:\n\n%s' % self._failure_traceback.getvalue()) def join(self, resource): if self.status is Status.COMMITFAILED: self._prior_operation_failed() if self.status is not Status.ACTIVE: raise ValueError("expected txn status %r, but it's %r" % (Status.ACTIVE, self.status)) if myhasattr(resource, 'prepare'): resource = DataManagerAdapter(resource) self._resources.append(resource) if self._savepoint2index: datamanager_savepoint = AbortSavepoint(resource, self) for transaction_savepoint in self._savepoint2index.keys(): transaction_savepoint._savepoints.append(datamanager_savepoint) def savepoint(self, optimistic = False): if self.status is Status.COMMITFAILED: self._prior_operation_failed() try: savepoint = Savepoint(self, optimistic, *self._resources) except: self._cleanup(self._resources) self._saveCommitishError() if self._savepoint2index is None: self._savepoint2index = weakref.WeakKeyDictionary() self._savepoint_index += 1 self._savepoint2index[savepoint] = self._savepoint_index return savepoint def _remove_and_invalidate_after(self, savepoint): savepoint2index = self._savepoint2index index = savepoint2index[savepoint] for savepoint, i in savepoint2index.items(): if i > index: savepoint.transaction = None del savepoint2index[savepoint] continue def _invalidate_all_savepoints(self): for savepoint in self._savepoint2index.keys(): savepoint.transaction = None self._savepoint2index.clear() def register(self, obj): manager = getattr(obj, '_p_jar', obj) if manager is None: raise ValueError('Register with no manager') adapter = self._adapters.get(manager) if adapter is None: adapter = MultiObjectResourceAdapter(manager) adapter.objects.append(obj) self._adapters[manager] = adapter self.join(adapter) elif not id(obj) not in map(id, adapter.objects): raise AssertionError adapter.objects.append(obj) def commit(self, subtransaction = _marker, deprecation_wng = True): if subtransaction is _marker: subtransaction = 0 elif deprecation_wng: deprecated37 = deprecated37 import ZODB.utils deprecated37('subtransactions are deprecated; instead of transaction.commit(1), use transaction.savepoint(optimistic=True) in contexts where a subtransaction abort will never occur, or sp=transaction.savepoint() if later rollback is possible and then sp.rollback() instead of transaction.abort(1)') if self._savepoint2index: self._invalidate_all_savepoints() if subtransaction: self._subtransaction_savepoint = self.savepoint(optimistic = True) return None if self.status is Status.COMMITFAILED: self._prior_operation_failed() self._callBeforeCommitHooks() self._synchronizers.map((lambda s: s.beforeCompletion(self))) self.status = Status.COMMITTING try: self._commitResources() except: self._saveCommitishError() self.status = Status.COMMITTED if self._manager: self._manager.free(self) self._synchronizers.map((lambda s: s.afterCompletion(self))) self.log.debug('commit') def _saveCommitishError(self): self.status = Status.COMMITFAILED ft = self._failure_traceback = StringIO() (t, v, tb) = sys.exc_info() traceback.print_stack(sys._getframe(1), None, ft) traceback.print_tb(tb, None, ft) ft.writelines(traceback.format_exception_only(t, v)) raise t, v, tb def getBeforeCommitHooks(self): return iter(self._before_commit) def addBeforeCommitHook(self, hook, args = (), kws = None): if kws is None: kws = { } self._before_commit.append((hook, tuple(args), kws)) def beforeCommitHook(self, hook, *args, **kws): deprecated38 = deprecated38 import ZODB.utils deprecated38('Use addBeforeCommitHook instead of beforeCommitHook.') self.addBeforeCommitHook(hook, args, kws) def _callBeforeCommitHooks(self): for hook, args, kws in self._before_commit: hook(*args, **kws) self._before_commit = [] def _commitResources(self): L = list(self._resources) L.sort(rm_cmp) try: for rm in L: rm.tpc_begin(self) for rm in L: rm.commit(self) self.log.debug('commit %r' % rm) for rm in L: rm.tpc_vote(self) self._voted[id(rm)] = True try: for rm in L: rm.tpc_finish(self) except: self.log.critical('A storage error occurred during the second phase of the two-phase commit. Resources may be in an inconsistent state.') raise except: (t, v, tb) = sys.exc_info() try: self._cleanup(L) finally: self._synchronizers.map((lambda s: s.afterCompletion(self))) raise t, v, tb def _cleanup(self, L): for rm in L: if id(rm) not in self._voted: try: rm.abort(self) except Exception: self.log.error('Error in abort() on manager %s', rm, exc_info = sys.exc_info()) except: None<EXCEPTION MATCH>Exception None<EXCEPTION MATCH>Exception for rm in L: try: rm.tpc_abort(self) continue except Exception: self.log.error('Error in tpc_abort() on manager %s', rm, exc_info = sys.exc_info()) continue def abort(self, subtransaction = _marker, deprecation_wng = True): if subtransaction is _marker: subtransaction = 0 elif deprecation_wng: deprecated37 = deprecated37 import ZODB.utils deprecated37('subtransactions are deprecated; use sp.rollback() instead of transaction.abort(1), where `sp` is the corresponding savepoint captured earlier') if subtransaction: if not self._subtransaction_savepoint: raise interfaces.InvalidSavepointRollbackError if self._subtransaction_savepoint.valid: self._subtransaction_savepoint.rollback() self._subtransaction_savepoint.transaction = None if not not (self._subtransaction_savepoint.valid): raise AssertionError return None if self._savepoint2index: self._invalidate_all_savepoints() self._synchronizers.map((lambda s: s.beforeCompletion(self))) tb = None for rm in self._resources: try: rm.abort(self) continue if tb is None: (t, v, tb) = sys.exc_info() self.log.error('Failed to abort resource manager: %s', rm, exc_info = sys.exc_info()) if self._manager: self._manager.free(self) self._synchronizers.map((lambda s: s.afterCompletion(self))) self.log.debug('abort') if tb is not None: raise t, v, tb def note(self, text): text = text.strip() if self.description: self.description += '\n\n' + text else: self.description = text def setUser(self, user_name, path = '/'): self.user = '%s %s' % (path, user_name) def setExtendedInfo(self, name, value): self._extension[name] = value class MultiObjectResourceAdapter(object): '''Adapt the old-style register() call to the new-style join(). With join(), a resource mananger like a Connection registers with the transaction manager. With register(), an individual object is passed to register(). ''' def __init__(self, jar): self.manager = jar self.objects = [] self.ncommitted = 0 def __repr__(self): return '<%s for %s at %s>' % (self.__class__.__name__, self.manager, id(self)) def sortKey(self): return self.manager.sortKey() def tpc_begin(self, txn): self.manager.tpc_begin(txn) def tpc_finish(self, txn): self.manager.tpc_finish(txn) def tpc_abort(self, txn): self.manager.tpc_abort(txn) def commit(self, txn): for o in self.objects: self.manager.commit(o, txn) self.ncommitted += 1 def tpc_vote(self, txn): self.manager.tpc_vote(txn) def abort(self, txn): tb = None for o in self.objects: try: self.manager.abort(o, txn) continue if tb is None: (t, v, tb) = sys.exc_info() txn.log.error('Failed to abort object: %s', object_hint(o), exc_info = sys.exc_info()) if tb is not None: raise t, v, tb def rm_cmp(rm1, rm2): return cmp(rm1.sortKey(), rm2.sortKey()) def object_hint(o): '''Return a string describing the object. This function does not raise an exception. ''' oid_repr = oid_repr import ZODB.utils klass = o.__class__.__name__ oid = getattr(o, '_p_oid', _marker) if oid is not _marker: oid = oid_repr(oid) return '%s oid=%s' % (klass, oid) class DataManagerAdapter(object): '''Adapt zodb 4-style data managers to zodb3 style Adapt transaction.interfaces.IDataManager to ZODB.interfaces.IPureDatamanager ''' def __init__(self, datamanager): self._datamanager = datamanager def commit(self, transaction): pass def abort(self, transaction): self._datamanager.abort(transaction) def tpc_begin(self, transaction): pass def tpc_abort(self, transaction): self._datamanager.abort(transaction) def tpc_finish(self, transaction): self._datamanager.commit(transaction) def tpc_vote(self, transaction): self._datamanager.prepare(transaction) def sortKey(self): return self._datamanager.sortKey() class Savepoint: '''Transaction savepoint. Transaction savepoints coordinate savepoints for data managers participating in a transaction. ''' interface.implements(interfaces.ISavepoint) valid = property((lambda self: self.transaction is not None)) def __init__(self, transaction, optimistic, *resources): self.transaction = transaction self._savepoints = savepoints = [] for datamanager in resources: try: savepoint = datamanager.savepoint except AttributeError: if not optimistic: raise TypeError('Savepoints unsupported', datamanager) savepoint = NoRollbackSavepoint(datamanager) savepoint = savepoint() savepoints.append(savepoint) def rollback(self): transaction = self.transaction if transaction is None: raise interfaces.InvalidSavepointRollbackError transaction._remove_and_invalidate_after(self) try: for savepoint in self._savepoints: savepoint.rollback() except: transaction._saveCommitishError() class AbortSavepoint: def __init__(self, datamanager, transaction): self.datamanager = datamanager self.transaction = transaction def rollback(self): self.datamanager.abort(self.transaction) class NoRollbackSavepoint: def __init__(self, datamanager): self.datamanager = datamanager def rollback(self): raise TypeError('Savepoints unsupported', self.datamanager)